Feat/add data publishing to stream processor#18
Conversation
pytrickle/client.py
Outdated
| while not self.stop_event.is_set(): | ||
| # Wait for 333ms or until stop event is set | ||
| try: | ||
| await asyncio.wait_for(self.stop_event.wait(), timeout=0.333) |
There was a problem hiding this comment.
timeout value ( naming?) should be a constant. but ... I think it might be beneficial as a configurable option.
There was a problem hiding this comment.
I think configurable would be good as well. Added an option on StreamProcessor and TrickleClient that defaults to 333ms (3x as fast as a segment). a26d25c
Note that go-livepeer does not deserialize it, just shuffles it along with a scanner.Scan() that reads until a new line char is found.
| except queue.Full: | ||
| pass |
There was a problem hiding this comment.
Can we add one warning log line here with when the queue is full?
There was a problem hiding this comment.
That is the output queue right? Not the data queue. Happy to add that in a separate PR while also changing it to wither an asyncio.Queue or a Deque
There was a problem hiding this comment.
Ah, yes. It's a deque. Apparently this will not throw a QueueFull error, but can be logged once the length hits max https://docs.python.org/3.10/library/collections.html?highlight=deque#collections.deque.
Sure, let's handle this in another PR.
I'm thinking we can add something like this or go back to a regular queue if catching full is more performant:
if len(self.data_queue) == self.data_queue.maxlen:
print("Discarding message:", self.data_queue[0])
self.data_queue.append(new_item)
Add basic plumbing to send data back through trickle protocol.
Dependent on go-livepeer PR livepeer/go-livepeer#3689